Spark Dataset APIについて | Hadoop Advent Calendar 2016 #17
こんにちは、小澤です。 この記事はHadoop Advent Calendar 17日目のものとなります。
前回はSpark SQLとDataFrame APIについて紹介させていただきました。
今回はSparkのDataset APIというものについて書かせていただきます。
Dataset APIについて
Datasetは現在の最新版である2.0の一つ前の1.6から導入された機能であるため、Sparkの他のコンポーネントと比較してご存じない方も多いのではないでしょうか?
Dataset APIはRDDを使用した際の柔軟性とDataFrameを利用した際の利便性を兼ね備えた仕組みとして作られました。 Dataset APIの要件として挙げられているものをざっくりと説明すると、
といったものになります。より詳細な情報はJiraのチケットをご確認ください(英語です)。
Dataset APIの利用
Dataset APIは現在のところ、ScalaとJavaのみAPIを提供しています。
PythonとRに関しては言語の性質上Dataset APIによって得られるメリットのほとんどがDataFrameですでに達成しているという見解のようです(参考: 公式ドキュメント)
今回はScalaから利用したいと思います。
SparkSessionの作成
DataFrame API同様、Dataset APIも2.0以降であればSparkSession、1.6であればSQLContextをまずは作成します。 一番最後のimportを忘れないように注意しましょう。
import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Dataset API") .getOrCreate() import spark.implicits._
Datasetの作成
ここから先はspark-shellでの実行例となります。
ScalaのSeqから
まずは一番簡単な例としてScalaのSeqからDatasetを作成してみます
scala> val ds = Seq(1, 2, 3, 4, 5).toDS() scala> ds res0: org.apache.spark.sql.Dataset[Int] = [value: int] scala> ds.show() +-----+ |value| +-----+ | 1| | 2| | 3| | 4| | 5| +-----+
Int型のDatasetになっていることと、DataFrameのようにshowメソッドでデータが確認できることがわかるかと思います。
RDDから
RDDからも同様にtoDS()が使えまが、こちらはSparkSessionを利用しても作成できます。
scala> val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24 scala> rdd.toDS() res3: org.apache.spark.sql.Dataset[Int] = [value: int] scala> spark.createDataset(rdd) res4: org.apache.spark.sql.Dataset[Int] = [value: int]
どちらも特に問題ないかと思います。
DataFrameから
DataFrameからDatasetを作るには型の情報が必要なのでまずクラスを作成ます。
作成したクラスを型として指定してDatasetを作成ます。
今回はcase classを利用してRDDからDataFrame作成し、Dataset作成時の型指定には利用したcase classを指定します。
scala> case class Iris(sepal_length: Double, sepal_width: Double, petal_length: Double, petal_width: Double, species: String) defined class Iris scala> val df = sc.textFile("iris").map {line => | val item = line.split(",") | Iris(item(0).toDouble, item(1).toDouble, item(2).toDouble, item(3).toDouble, item(4)) | }.toDF() df: org.apache.spark.sql.DataFrame = [sepal_length: double, sepal_width: double ... 3 more fields] scala> df.show(3) +------------+-----------+------------+-----------+-----------+ |sepal_length|sepal_width|petal_length|petal_width| species| +------------+-----------+------------+-----------+-----------+ | 5.1| 3.5| 1.4| 0.2|Iris-setosa| | 4.9| 3.0| 1.4| 0.2|Iris-setosa| | 4.7| 3.2| 1.3| 0.2|Iris-setosa| +------------+-----------+------------+-----------+-----------+ only showing top 3 rows scala> val ds = df.as[Iris] ds: org.apache.spark.sql.Dataset[Iris] = [sepal_length: double, sepal_width: double ... 3 more fields] scala> ds.show(3) +------------+-----------+------------+-----------+-----------+ |sepal_length|sepal_width|petal_length|petal_width| species| +------------+-----------+------------+-----------+-----------+ | 5.1| 3.5| 1.4| 0.2|Iris-setosa| | 4.9| 3.0| 1.4| 0.2|Iris-setosa| | 4.7| 3.2| 1.3| 0.2|Iris-setosa| +------------+-----------+------------+-----------+-----------+ only showing top 3 rows
Datasetの利用
Datasetを使う際のAPIは基本的にDataFrameと一緒です。以下はfilterの例です。
scala> ds.filter(_.species == "Iris-setosa").show(3) +------------+-----------+------------+-----------+-----------+ |sepal_length|sepal_width|petal_length|petal_width| species| +------------+-----------+------------+-----------+-----------+ | 5.1| 3.5| 1.4| 0.2|Iris-setosa| | 4.9| 3.0| 1.4| 0.2|Iris-setosa| | 4.7| 3.2| 1.3| 0.2|Iris-setosa| +------------+-----------+------------+-----------+-----------+ only showing top 3 rows
DataFrameのとの違いとして、selectなどをする際の型の指定があります。 これをつけないとDataFrameに変換されるようです。
scala> ds.select('species) res22: org.apache.spark.sql.DataFrame = [species: string] scala> ds.select('species.as[String]) res23: org.apache.spark.sql.Dataset[String] = [species: string]
DataFrameやRDDへの変換
これも簡単にできます。シームレスに変換できるようがわかりやすいかと思います。
scala> ds.rdd res25: org.apache.spark.rdd.RDD[Iris] = MapPartitionsRDD[22] at rdd at <console>:35 scala> ds.toDF() res26: org.apache.spark.sql.DataFrame = [sepal_length: double, sepal_width: double ... 3 more fields]
注意点としては、先ほどのDatasetの利用であげた通りかなり柔軟に対応してくれるのでDatasetを扱っていたつもりがいつの間にかDataFrameになっていたということがあるかと思います。
特に、タイプセーフを期待して使っている場合は、いつの間にかDataFrameになっており途中で間違った型を指定していたのにコンパイル時に気付けなかったということもあり得るかと思います。
終わりに
今回はDataset APIについて解説しました。 今すぐに必要になるという場面はまだ少ないかと思いますが、このような新しい機能も存在していることを知っておくといずれ役立つかもしれません。
明日はSparkで機械学習をするためのMLlibについて書かせていただく予定です。
ぜひ、お楽しみに!